package com.flink.examples.mysql;

import org.apache.flink.api.java.io.jdbc.JDBCOptions;
import org.apache.flink.api.java.io.jdbc.JDBCTableSource;
import org.apache.flink.table.api.*;

/**
 * @Description 将mysql表中数据查询输出到Flink Table中
 * @Author JL
 * @Date 2020/09/17
 * @Version V1.0
 */
public class TableSource {
    public static void main(String[] args) throws Exception {
        EnvironmentSettings bbSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build();
        TableEnvironment tableEnv = TableEnvironment.create(bbSettings);
        //查询sql
        String sql = "SELECT id,name,age,sex,address,createTimeSeries FROM t_user";

        //设置表视图字段与类型
        TableSchema tableSchema = TableSchema.builder()
                .field("id", DataTypes.INT())
                .field("name", DataTypes.STRING())
                .field("age", DataTypes.INT())
                .field("sex", DataTypes.INT())
                .field("address", DataTypes.STRING())
//                .field("createTime", DataTypes.TIMESTAMP())
                .field("createTimeSeries", DataTypes.BIGINT())
                .build();

        //配置jdbc数据源选项
        JDBCOptions jdbcOptions = JDBCOptions.builder()
                .setDriverName(MysqlConfig.DRIVER_CLASS)
                .setDBUrl(MysqlConfig.SOURCE_DRIVER_URL)
                .setUsername(MysqlConfig.SOURCE_USER)
                .setPassword(MysqlConfig.SOURCE_PASSWORD)
                .setTableName("t_user")
                .build();
        JDBCTableSource jdbcTableSource = JDBCTableSource.builder().setOptions(jdbcOptions).setSchema(tableSchema).build();

        //将数据源注册到tableEnv视图t_user中
        tableEnv.registerTableSource("t_user", jdbcTableSource);
        //sql查询
        Table table = tableEnv.sqlQuery(sql);
        //获取结果
        TableResult result = table.execute();
//        TableResult result = tableEnv.executeSql(sql);
        //打印
        result.print();
    }

}
/*
+-------------+--------------------------------+-------------+-------------+--------------------------------+----------------------+
|          id |                           name |         age |         sex |                        address |     createTimeSeries |
+-------------+--------------------------------+-------------+-------------+--------------------------------+----------------------+
|           8 |                           liu3 |          21 |           1 |                             CN |        1598889600000 |
|           9 |                          wang1 |          23 |           1 |                             CN |        1598976000000 |
+-------------+--------------------------------+-------------+-------------+--------------------------------+----------------------+
 */